Lab 2 - Spark SQL

This Lab will show you how to work with Spark SQL

Step 1

Getting started: Create a SQL Context

Type:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)


In [1]:
#Create the SQLContext

Step 2

Dowload a JSON Recordset to work with

Let's download the data, we can run commands on the console of the server (or docker image) that the notebook enviroment is using. To do so we simply put a "!" in front of the command that we want to run. For example:

!pwd

To get the data we will download a file to the enviroment. Simple run these two commands, the first just ensures that the file is removed if it exists:

!rm world_bank.json.gz -f
!wget https://raw.githubusercontent.com/bradenrc/sparksql_pot/master/world_bank.json.gz


In [6]:
#enter the commands to remove and download file here

Step 3

Create a Dataframe

Now you can create the Dataframe, note that if you wanted to see where you downloaded the file you can run !pwd or !ls

To create the Dataframe type:

example1_df = sqlContext.read.json("world_bank.json.gz")


In [3]:
#create the Dataframe here:

We can look at the schema with this command:

example1_df.printSchema()


In [4]:
#print out the schema

Dataframes work like RDDs, you can map, reduce, groupby, etc.
Take a look at the first two rows of data using "take"


In [5]:
#Use take on the dataframe to pull out 2 rows

Step 4

Register a table

Using DataframeObject.registerTempTable("name_of_table")

Create a table named "world_bank"


In [9]:
#Create the table to be referenced via SQL

Step 5

Writing SQL Statements

Using SQL Get the first 2 records sqlContext.sql("SQL Statement") will return a Dataframe with the records


In [1]:
#Use SQL to select from table limit 2 and print the output

In [2]:
#Extra credit, take the Dataframe you created with the two records and convert it into Pandas

In [3]:
#Now Calculate a Simple count based on a group, for example "regionname"

In [1]:
# With JSON data you can reference the nested data
# If you look at Schema above you can see that Sector.Name is a nested column
# Select that column and limit to reasonable output (like 2)

Step 6

Creating simple graphs

Using Pandas we can do create some simple visualizations.

First create a SQL statement that is a resonable number if items

For example, you can count the number of projects (rows) by countryname
or in anothe words:
count(*), countryname from table group by countryname


In [2]:
# we need to tell the charting library (matplotlib) to display charts inline
# just run this paragraph
%matplotlib inline 
import matplotlib.pyplot as plt, numpy as np

In [3]:
# first write the sql statment and look at the data, remember to add .toPandas() to have it look nice
# an even easier option is to create a variable and set it to the SQL statement
# for example: 
# query = "select count(*) as Count, countryname from world_bank group by countryname"
# chart1_df = sqlContext.sql(query).toPandas()
# print chart1_df

In [4]:
# now take the variable (or same sql statement) and use the method:
# .plot(kind='bar', x='countryname', y='Count', figsize=(12, 5))

Step 7

Creating a dataframe "manually" by adding a schema to an RDD

First, we need to create an RDD of pairs or triplets. This can be done using code (for loop) as seen in the instructor's example, or more simply by assigning values to an array.


In [5]:
# Default array defined below. Feel free to change as desired.
array=[[1,1,1],[2,2,2],[3,3,3],[4,4,4],[5,5,5]]
my_rdd = sc.parallelize(array)
my_rdd.collect()


Out[5]:
[[1, 1, 1], [2, 2, 2], [3, 3, 3], [4, 4, 4], [5, 5, 5]]

Use first the StructField method, following these steps:
1- Define your schema columns as a string
2- Build the schema object using StructField
3- Apply the schema object to the RDD

Note: The cell below is missing some code and will not run properly until the missing code has been completed.


In [ ]:
from pyspark.sql.types import *

# The schema is encoded in a string. Complete the string below
schemaString = ""

# MissingType() should be either StringType() or IntegerType(). Please replace as required.
fields = [StructField(field_name, MissingType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaExample = sqlContext.createDataFrame(use_your_rdd_name_here, schema)

# Register the DataFrame as a table. Add table name below as parameter to registerTempTable.
schemaExample.registerTempTable("")

In [ ]:
# Run some select statements on your newly created DataFrame and display the output

In [ ]: